test(router): entity-splitter module POC — split _entities fetches and fan out in parallel#2775
test(router): entity-splitter module POC — split _entities fetches and fan out in parallel#2775jensneuse wants to merge 1 commit into
Conversation
…es fan-out Custom router module that intercepts outgoing _entities subgraph fetches via EnginePreOriginHandler.OnOriginRequest, splits the representations array into batches, fetches each batch in parallel, and merges the responses back into a single synthetic *http.Response that short-circuits the real origin call. Demonstrates feasibility of request-level batching without engine changes. Implementation notes: - astjson parser + arena pool for parsing incoming bodies - simple body-size threshold for triggering splits - OTel trace context propagation on fan-out sub-requests - order-preserving merge with null-fill + reindexed error paths on failures Integration tests (router-tests/modules/entity_splitter_test.go): - 100 reps split into 10 parallel fetches of 10, exact-order assertion - small request below threshold passes through unsplit - one failed chunk: data carries nulls at the right positions and the engine surfaces the per-position errors under extensions.errors Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
WalkthroughThe changes introduce a new entity-splitter module for router-tests that intercepts large Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.4)level=error msg="[linters_context] typechecking error: pattern ./...: directory prefix . does not contain main module or its selected dependencies" Comment |
Router-nonroot image scan passed✅ No security vulnerabilities found in image: |
Router image scan passed✅ No security vulnerabilities found in image: |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
router-tests/modules/entity-splitter/module.go (2)
291-299: Response body is read before status check, wasting I/O on errors.The body is fully read (line 291) before checking status code (line 296). For non-200 responses, this reads potentially large error bodies unnecessarily.
♻️ Minor optimization
resp, err := m.Client.Do(subReq) if err != nil { out.fetchErr = fmt.Errorf("sub-fetch: %w", err) return } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + out.fetchErr = fmt.Errorf("sub-fetch returned status %d", resp.StatusCode) + return + } + bodyBytes, err := io.ReadAll(resp.Body) if err != nil { out.fetchErr = fmt.Errorf("read sub-response: %w", err) return } - if resp.StatusCode != http.StatusOK { - out.fetchErr = fmt.Errorf("sub-fetch returned status %d", resp.StatusCode) - return - }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router-tests/modules/entity-splitter/module.go` around lines 291 - 299, The code reads resp.Body into bodyBytes before checking resp.StatusCode, causing unnecessary I/O for non-200 responses; change the logic in the fetch handling (the block using resp, bodyBytes and out.fetchErr) to first inspect resp.StatusCode and set out.fetchErr for non-OK statuses without reading the full body, and only call io.ReadAll(resp.Body) when resp.StatusCode == http.StatusOK (optionally reading a small/truncated body for error logging if needed) so you avoid reading large error payloads unnecessarily.
241-253: Prefersync.WaitGroup.Gofor cleaner goroutine management.Go 1.25+ provides
wg.Go(func())which handlesAdd/Doneautomatically. This is the preferred pattern in this repository.♻️ Proposed refactor
- var wg sync.WaitGroup - wg.Add(len(chunks)) absStart := 0 + var wg sync.WaitGroup for i, chunk := range chunks { results[i].absStart = absStart results[i].chunkLen = len(chunk) absStart += len(chunk) - go func(idx int, chunkReps [][]byte) { - defer wg.Done() + wg.Go(func() { m.SubFetchCount.Add(1) - m.runSubFetch(req, pr, chunkReps, &results[idx]) - }(i, chunk) + m.runSubFetch(req, pr, chunk, &results[i]) + }) } wg.Wait()Based on learnings: "In Go code (Go 1.25+), prefer using sync.WaitGroup.Go(func()) to run a function in a new goroutine, letting the WaitGroup manage Add/Done automatically."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router-tests/modules/entity-splitter/module.go` around lines 241 - 253, Replace the manual wg.Add(len(chunks)) + go func(...){ defer wg.Done() ... } pattern with sync.WaitGroup.Go for each chunk: remove the wg.Add call, and inside the loop call wg.Go(func() { m.SubFetchCount.Add(1); m.runSubFetch(req, pr, chunkReps, &results[idx]) }) while ensuring you capture loop variables safely (pass idx and chunk as parameters or assign to locals like i2, chunk2) so m.runSubFetch and results[i] use the correct values; keep existing assignments to results[i].absStart and results[i].chunkLen before launching the goroutine.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@router-tests/modules/entity-splitter/module.go`:
- Around line 71-79: The Module.New factory currently returns the receiver
instance m, causing all modules to share the same EntitySplitterModule (and its
atomic counters); change the New function returned in Module() to construct and
return a fresh instance (e.g., return &EntitySplitterModule{} or a new struct
populated from m's config fields) so each module gets its own counters; update
tests only if they deliberately relied on the shared instance for observability.
Ensure references to EntitySplitterModule, Module(), and ModuleInfo are used to
locate and modify the factory closure.
---
Nitpick comments:
In `@router-tests/modules/entity-splitter/module.go`:
- Around line 291-299: The code reads resp.Body into bodyBytes before checking
resp.StatusCode, causing unnecessary I/O for non-200 responses; change the logic
in the fetch handling (the block using resp, bodyBytes and out.fetchErr) to
first inspect resp.StatusCode and set out.fetchErr for non-OK statuses without
reading the full body, and only call io.ReadAll(resp.Body) when resp.StatusCode
== http.StatusOK (optionally reading a small/truncated body for error logging if
needed) so you avoid reading large error payloads unnecessarily.
- Around line 241-253: Replace the manual wg.Add(len(chunks)) + go func(...){
defer wg.Done() ... } pattern with sync.WaitGroup.Go for each chunk: remove the
wg.Add call, and inside the loop call wg.Go(func() { m.SubFetchCount.Add(1);
m.runSubFetch(req, pr, chunkReps, &results[idx]) }) while ensuring you capture
loop variables safely (pass idx and chunk as parameters or assign to locals like
i2, chunk2) so m.runSubFetch and results[i] use the correct values; keep
existing assignments to results[i].absStart and results[i].chunkLen before
launching the goroutine.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c6fdc012-9e02-45a3-896f-68fbac33edff
📒 Files selected for processing (3)
router-tests/go.modrouter-tests/modules/entity-splitter/module.gorouter-tests/modules/entity_splitter_test.go
| func (m *EntitySplitterModule) Module() core.ModuleInfo { | ||
| return core.ModuleInfo{ | ||
| ID: moduleID, | ||
| Priority: 1, | ||
| New: func() core.Module { | ||
| return m | ||
| }, | ||
| } | ||
| } |
There was a problem hiding this comment.
Module.New returns m (the receiver) instead of a fresh instance.
The New factory function in ModuleInfo returns the same receiver instance m instead of creating a new instance. This means all modules will share the same EntitySplitterModule instance, which will cause race conditions on the atomic counters when tests run in parallel or multiple routers are instantiated.
🐛 Proposed fix
func (m *EntitySplitterModule) Module() core.ModuleInfo {
return core.ModuleInfo{
ID: moduleID,
Priority: 1,
New: func() core.Module {
- return m
+ return &EntitySplitterModule{}
},
}
}Note: If the intent is to share the same instance for test observability (accessing SubFetchCount/SplitRequestCount), this is acceptable for test-only code but should be documented. The current tests pass the same splitter instance to WithCustomModules, so this works as intended for the POC.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@router-tests/modules/entity-splitter/module.go` around lines 71 - 79, The
Module.New factory currently returns the receiver instance m, causing all
modules to share the same EntitySplitterModule (and its atomic counters); change
the New function returned in Module() to construct and return a fresh instance
(e.g., return &EntitySplitterModule{} or a new struct populated from m's config
fields) so each module gets its own counters; update tests only if they
deliberately relied on the shared instance for observability. Ensure references
to EntitySplitterModule, Module(), and ModuleInfo are used to locate and modify
the factory closure.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2775 +/- ##
==========================================
+ Coverage 64.45% 65.09% +0.63%
==========================================
Files 311 251 -60
Lines 44295 25887 -18408
Branches 4764 0 -4764
==========================================
- Hits 28551 16851 -11700
+ Misses 15721 7664 -8057
- Partials 23 1372 +1349 🚀 New features to boost your workflow:
|
|
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
|
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
Summary
Proof-of-concept custom router module that splits large
_entitiessubgraph fetches into parallel sub-fetches and reassembles the response — demonstrating that request-level batching can be implemented entirely as a module, without engine changes.router-tests/modules/entity-splitter/and is test-only.EnginePreOriginHandler.OnOriginRequest. When the incoming body exceedsSplitThresholdbytes and carries_entitiesrepresentations, the module fans outBatchSize-sized chunks in parallel, merges results in order, and returns a synthetic*http.Responsethat short-circuits the real origin fetch.otel.GetTextMapPropagator().Injectso each sub-fetch appears as a child span of the original subgraph request._entities[i]rewritten to absolute_entities[absStart+i]).Tests
go test ./router-tests/modules/ -run TestEntitySplitterModule -race— three subtests:name:"employee-<id>"per position, proving order preservation end-to-end.data.employeesis exactly 100 positions in order withhobbies:nullat ids 41–50 and distinct names elsewhere; engine surfaces one top-level subgraph-fetch error with the 10 per-position errors nested underextensions.errors.Test plan
cd router-tests && go test ./modules/ -run TestEntitySplitterModule -race -v🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Tests